package com.shujia.sql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

object Demo5RDD2DataFrame {
  def main(args: Array[String]): Unit = {
    val sparkSession: SparkSession = SparkSession.builder()
      .master("local")
      .appName("rdd与df之间的转换")
      .config("spark.sql.shuffle.partitions", "1")
      .getOrCreate()

    //通过SparkSession获取sparkContext对象
    val sparkContext: SparkContext = sparkSession.sparkContext

    //作用1：使用$函数
    //作用2：可以在不同的数据结构之间转换
    import sparkSession.implicits._

    /**
     * spark core的核心数据结构是：RDD
     * spark sql的核心数据结构是DataFrame
     */
    // RDD->DataFrame  .toDF
    val linesRDD: RDD[String] = sparkContext.textFile("spark/data/students.txt")
    val stuRDD: RDD[(String, String, String, String, String)] = linesRDD.map((line: String) => {
      line.split(",") match {
        case Array(id: String, name: String, age: String, gender: String, clazz: String) =>
          (id, name, age, gender, clazz)
      }
    })
    val resRDD1: RDD[(String, Int)] = stuRDD.groupBy(_._5)
      .map((kv: (String, Iterable[(String, String, String, String, String)])) => {
        (kv._1, kv._2.size)
      })
    val df1: DataFrame = resRDD1.toDF
    val df2: DataFrame = df1.select($"_1" as "clazz", $"_2" as "counts")
    df2.printSchema()

    // DataFrame->RDD  .rdd
    val resRDD2: RDD[Row] = df2.rdd
//    resRDD2.map((row:Row)=>{
//      val clazz: String = row.getAs[String]("clazz")
//      val counts: Integer = row.getAs[Integer]("counts")
//      s"班级:$clazz, 人数:$counts"
//    }).foreach(println)

    resRDD2.map {
      case Row(clazz:String, counts:Integer)=>
        s"班级:$clazz, 人数:$counts"
    }.foreach(println)



  }
}
